#!/bin/sh

. "$SPEC_PATH/abstract/backup"

e2e_test_extra_hash() {
  printf '%s\n' E2E_CITUS_POSTGRES_VERSION="$E2E_CITUS_POSTGRES_VERSION"
  "$SHELL" "$PROJECT_PATH/stackgres-k8s/ci/build/build-functions.sh" path_hash \
    "$(realpath --relative-to "$PROJECT_PATH" "$SPEC_PATH/abstract/backup")"
}

e2e_test_install() {
  E2E_POSTGRES_VERSION="${E2E_CITUS_POSTGRES_VERSION:-$E2E_POSTGRES_VERSION}"
  CLUSTER_NAME="$(get_sgshardedcluster_name "$SPEC_NAME")"

  install_minio

  create_or_replace_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" "3" "2"

  wait_pods_running "$CLUSTER_NAMESPACE" 7
  wait_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  switch_sharded_cluster_to_first "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"

  create_mock_data

  SHARDED_BACKUP_NAME="$(get_sgshardedbackup_name "${CLUSTER_NAME}-$(shuf -i 0-65535 -n 1)")"

  create_sharded_backup "$SHARDED_BACKUP_NAME" false

  wait_sharded_backup_is_completed "$SHARDED_BACKUP_NAME"

  rotate_wal_file_sharded_cluster

  BACKUP_PITR_DATE="$(date +%Y-%m-%dT%H:%M:%SZ --utc)"

  rotate_wal_file_sharded_cluster

  create_more_mock_data

  rotate_wal_file_sharded_cluster
}

e2e_test() {
  run_test "Checking that backup is working" check_backup_is_working

  run_test "Checking that sharded backup are working" check_sharded_backup_is_working

  run_test "Checking that automatic sharded backup are working" check_automatic_sharded_backup_is_working

  run_test "Checking that restoring a sharded backup is working" check_restore_sharded_backup_is_working

  run_test "Checking that restoring a sharded backup with PITR is working" check_restore_sharded_backup_with_pitr_is_working
}

check_backup_is_working() {
  local BACKUP_NAME
  local SHARDED_CLUSTER_NAME="$CLUSTER_NAME"
  local CLUSTER_NAME
  local NODE=0
  local SUFFIX
  for SUFFIX in "coord" "shard0" "shard1"
  do
    CLUSTER_NAME="$SHARDED_CLUSTER_NAME-$SUFFIX"
    check_wal_archive 0
  done
}

check_sharded_backup_is_working() {
  SHARDED_BACKUP_NAME_0="$(get_sgshardedbackup_name "${CLUSTER_NAME}-$(shuf -i 0-65535 -n 1)")"

  create_sharded_backup "$SHARDED_BACKUP_NAME_0" true

  local BACKUP_NAME
  local SHARDED_CLUSTER_NAME="$CLUSTER_NAME"
  local CLUSTER_NAME
  local NODE=0
  local SUFFIX
  for SUFFIX in coord shard0 shard1
  do
    CLUSTER_NAME="$SHARDED_CLUSTER_NAME-$SUFFIX"
    BACKUP_NAME="$SHARDED_BACKUP_NAME_0-$SUFFIX"
    wait_backup_is_completed "$BACKUP_NAME" "$NODE"
  done

  wait_sharded_backup_is_completed "$SHARDED_BACKUP_NAME_0"

  if kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" "$SHARDED_BACKUP_NAME_0" \
    --template='{{ range .status.sgBackups }}{{ . }} {{ end }}' \
    | grep -qFx "$SHARDED_BACKUP_NAME_0-coord $SHARDED_BACKUP_NAME_0-shard0 $SHARDED_BACKUP_NAME_0-shard1 "
  then
    success "Backups are referenced correctly in the sharded backup"
  else
    fail "Backups are not referenced correctly in the sharded backup"
  fi

  CLUSTER_NAME="$SHARDED_CLUSTER_NAME"
  SHARDED_BACKUP_NAME_1="$(get_sgshardedbackup_name "${CLUSTER_NAME}-$(shuf -i 0-65535 -n 1)")"
  SHARDED_BACKUP_NAME_2="$(get_sgshardedbackup_name "${CLUSTER_NAME}-$(shuf -i 0-65535 -n 1)")"
  create_sharded_backup "$SHARDED_BACKUP_NAME_1" true
  wait_sharded_backup_is_completed "$SHARDED_BACKUP_NAME_1"
  create_sharded_backup "$SHARDED_BACKUP_NAME_2" true
  wait_sharded_backup_is_completed "$SHARDED_BACKUP_NAME_2"

  if ! kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" "$SHARDED_BACKUP_NAME_0" >/dev/null 2>&1
  then
    success "Sharded backups retention policy is working"
  else
    fail "Sharded backups retention policy is not working"
  fi
}

check_automatic_sharded_backup_is_working() {
  enable_sharded_cluster_cron_schedule

  if wait_until is_automatic_sharded_backup_cr_completed
  then
    success "The full automatic sharded backup is available"
  else
    fail "The full automatic sharded backup is not available"
  fi

  local BACKUP_NAMES

  BACKUP_NAMES="$(kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" \
    --template '{{ range .items }}{{ .status.process.status }} {{ .spec.managedLifecycle }} {{ .spec.sgShardedCluster }} {{ range .status.sgBackups }}{{ . }} {{ end }}{{ print "\n" }}{{ end }}' \
    | grep "^Completed true ${CLUSTER_NAME} " | tail -n 1 | cut -d ' ' -f 4-)"

  if [ -z "$BACKUP_NAMES" ]
  then
    fail "Can not retrieve backups associated to the full automatic sharded backup"
  fi

  local BACKUP_NAME
  local SHARDED_CLUSTER_NAME="$CLUSTER_NAME"
  local CLUSTER_NAME
  local NODE=0
  for BACKUP_NAME in $BACKUP_NAMES
  do
    if CLUSTER_NAME="$(kubectl get sgbackup -n "$CLUSTER_NAMESPACE" "$BACKUP_NAME" \
      --template='{{ .status.process.status }} {{ .spec.sgCluster }}' \
      | grep '^Completed ')"
    then
      CLUSTER_NAME="$(printf %s "$CLUSTER_NAME" | cut -d ' ' -f 2)"

      check_timelines

      check_control_data

      success "The full backup $BACKUP_NAME is available"
    else
      fail "The full backup $BACKUP_NAME is not available"
    fi
  done

  CLUSTER_NAME="$SHARDED_CLUSTER_NAME"
  disable_sharded_cluster_cron_schedule
}


create_sharded_backup() {
  cat << EOF | kubectl create -f -
apiVersion: stackgres.io/v1
kind: SGShardedBackup
metadata:
  namespace: "$CLUSTER_NAMESPACE"
  name: "$1"
spec:
  sgShardedCluster: "$CLUSTER_NAME"
  managedLifecycle: $2
EOF
}

wait_sharded_backup_is_completed() {
  local BACKUP_NAME="$1"
  if wait_until -t "$((E2E_TIMEOUT * 3))" eval '[ "$(kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" "$BACKUP_NAME" \
    --template "{{ .status.process.status }}" \
    | grep "^Completed$" | wc -l)" -gt 0 ]'
  then
    success "The sharded backup CR has complete"
  else
    fail "The sharded backup CR has failed"
  fi
}

enable_sharded_cluster_cron_schedule() {
  # Sets the value At every minute.
  kubectl patch sgshardedcluster -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" --type json \
    --patch '[{"op":"replace","path":"/spec/configurations/backups/0/cronSchedule","value":"*/1 * * * *"}]'
}

disable_sharded_cluster_cron_schedule() {
  # Sets the value At 05:00 on day-of-month 31 in February.
  wait_until kubectl patch sgshardedcluster -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" --type json \
    --patch '[{"op":"replace","path":"/spec/configurations/backups/0/cronSchedule","value":"0 5 31 2 *"}]'
}

is_automatic_sharded_backup_cr_completed() {
  kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" \
    --template '{{ range .items }}{{ .status.process.status }} {{ .spec.managedLifecycle }} {{ .spec.sgShardedCluster }}{{ print "\n" }}{{ end }}' \
    | grep -q "^Completed true ${CLUSTER_NAME}$"
}

check_restore_sharded_backup_is_working() {
  SHARDED_BACKUP_NAME_FOR_RESTORE="$(kubectl get sgshardedbackup -n "$CLUSTER_NAMESPACE" \
    --template '{{ range .items }}{{ printf "%s %s\n" .status.process.status .metadata.name }}{{ end }}' \
    | grep "^Completed " | cut -d ' ' -f 2 | grep -vxF "$SHARDED_BACKUP_NAME" | head -n 1)"

  # kubectl get secret -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" -o json \
  #   | jq "del(.metadata.ownerReferences) | del(.metadata.labels) | .metadata.name = \"$CLUSTER_NAME-back\"" \
  #   | kubectl replace --force -f -

  remove_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"

  # Remove this when patroni fix citus integration so that superuser password is changed based on PATRONI_SUPERUSER_PASSWORD
  # kubectl get secret -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-back" -o json \
  #   | jq ".metadata.name = \"$CLUSTER_NAME\"" \
  #   | kubectl replace --force -f -

  create_or_replace_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" "3" "2" \
    --set-string cluster.initialData.restore.fromBackup.name="$SHARDED_BACKUP_NAME_FOR_RESTORE"

  local RESULT EXIT_CODE
  try_function wait_pods_running "$CLUSTER_NAMESPACE" 7
  if "$RESULT"
  then
    SWITCHOVER_TO_FIRST=true try_function wait_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  fi

  if "$RESULT"
  then
    success "The sharded cluster has been recovered from the sharded backup"
  else
    fail "The sharded cluster has not been recovered from the sharded backup"
  fi

  wait_until check_init_data_after_restore
}

check_restore_sharded_backup_with_pitr_is_working() {
  kubectl get secret -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME" -o json \
    | jq "del(.metadata.ownerReferences) | del(.metadata.labels) | .metadata.name = \"$CLUSTER_NAME-back\"" \
    | kubectl replace --force -f -

  remove_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"

  # Remove this when patroni fix citus integration so that superuser password is changed based on PATRONI_SUPERUSER_PASSWORD
  kubectl get secret -n "$CLUSTER_NAMESPACE" "$CLUSTER_NAME-back" -o json \
    | jq ".metadata.name = \"$CLUSTER_NAME\"" \
    | kubectl replace --force -f -

  create_or_replace_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE" "3" "2" \
    --set-string cluster.initialData.restore.fromBackup.name="$SHARDED_BACKUP_NAME" \
    --set-string cluster.initialData.restore.fromBackup.pointInTimeRecovery.restoreToTimestamp="$BACKUP_PITR_DATE"

  try_function wait_pods_running "$CLUSTER_NAMESPACE" 7
  if "$RESULT"
  then
    SWITCHOVER_TO_FIRST=true try_function wait_sharded_cluster "$CLUSTER_NAME" "$CLUSTER_NAMESPACE"
  fi

  if "$RESULT"
  then
    success "The sharded cluster has been recovered with PITR from the sharded backup"
  else
    fail "The sharded cluster has not been recovered with PITR from the sharded backup"
  fi

  wait_until check_init_data_after_pitr
}

create_mock_data() {
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "CREATE TABLE fibonacci_global(num integer, PRIMARY KEY (num));"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci_global(num) VALUES (1);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci_global(num) VALUES (2);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci_global(num) VALUES (3);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "CREATE TABLE fibonacci(num integer, PRIMARY KEY (num));"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "SELECT create_distributed_table('fibonacci', 'num');"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (1);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (2);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (3);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (1 + 134217728);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (2 + 134217728);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (3 + 134217728);"
}

create_more_mock_data() {
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci_global(num) VALUES (5);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci_global(num) VALUES (8);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci_global(num) VALUES (13);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (5);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (8);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (13);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (5 + 134217728);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (8 + 134217728);"
  run_query -x "$CLUSTER_NAME-coord-0" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "INSERT INTO fibonacci(num) VALUES (13 + 134217728);"
}

rotate_wal_file_sharded_cluster() {
  local POD
  for POD in "$CLUSTER_NAME-coord-0" "$CLUSTER_NAME-shard0-0" "$CLUSTER_NAME-shard1-0"
  do
    run_query -x "$POD" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" \
      -q "SELECT r.file_name from pg_walfile_name_offset(pg_current_wal_lsn()) as r" > "$LOG_PATH/$POD-current-wal-file"
    run_query -x "$POD" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" -q "CHECKPOINT;"
  done
  for POD in "$CLUSTER_NAME-coord-0" "$CLUSTER_NAME-shard0-0" "$CLUSTER_NAME-shard1-0"
  do
    CURRENT_WAL_FILE="$(cat "$LOG_PATH/$POD-current-wal-file")"
    wait_until -t "$((E2E_TIMEOUT / 5))" eval '[ "$(run_query -x "$POD" -p 5432 -i 0 -n "$CLUSTER_NAMESPACE" -c "$CLUSTER_NAME" -d "citus" \
      -q "SELECT r.file_name from pg_walfile_name_offset(pg_switch_wal()) as r")" != "$CURRENT_WAL_FILE" ]'
    wait_until -t "$((E2E_TIMEOUT / 5))" timeout -s KILL "$((E2E_TIMEOUT / 20))" \
      kubectl exec -n "$CLUSTER_NAMESPACE" "$POD" -c patroni -- \
      exec-with-env backup -- wal-g wal-fetch "$CURRENT_WAL_FILE" "/tmp/$CURRENT_WAL_FILE"
  done
}

check_init_data_after_restore() {
  local RESULT
  RESULT="$(run_query -p 5432 -i 1 -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "citus" \
    -q "SELECT num FROM fibonacci ORDER BY num;")"
  if printf '%s' "$RESULT" \
    | tr -d '\n' \
    | grep -q "^1235813134217729134217730134217731134217733134217736134217741$"
  then
    success "restore primary db restored successfully"
  else
    fail "primary db not restored successfully"
  fi
}

check_init_data_after_pitr() {
  local RESULT
  RESULT="$(run_query -p 5432 -i 1 -h "$CLUSTER_NAME" -c "$CLUSTER_NAME" -n "$CLUSTER_NAMESPACE" -d "citus" \
    -q "SELECT num FROM fibonacci ORDER BY num;")"
  if printf '%s' "$RESULT" \
    | tr -d '\n' \
    | grep -q "^123134217729134217730134217731$"
  then
    success "restore primary db restored with PITR successfully"
  else
    fail "primary db not restored with PITR successfully"
  fi
}
